import 'dart:async';

import 'package:rxdart/rxdart.dart';

import '../../../common/test_page.dart';
import '../../utils.dart';

Stream<int> _getStream() {
  final controller = StreamController<int>();

  Timer(const Duration(milliseconds: 100), () => controller.add(1));
  Timer(const Duration(milliseconds: 200), () => controller.add(2));
  Timer(const Duration(milliseconds: 300), () => controller.add(3));
  Timer(const Duration(milliseconds: 400), () {
    controller.add(4);
    controller.close();
  });

  return controller.stream;
}

class DebounceTestPage extends TestPage {
  DebounceTestPage(super.title) {
    test('Rx.debounce', () async {
      expect(
          _getStream().debounce((_) => Stream<void>.fromFuture(
              Future<void>.delayed(const Duration(milliseconds: 200)))));
    });

    test('Rx.debounce.dynamicWindow', () async {
      expect(
          _getStream().debounce((value) => value == 3
              ? Stream<bool>.value(true)
              : Stream<void>.fromFuture(
              Future<void>.delayed(const Duration(milliseconds: 200)))),
      );
    });

    test('Rx.debounce.reusable', () async {
      final transformer = DebounceStreamTransformer<int>(
              (_) => Stream<void>.periodic(const Duration(milliseconds: 200)));

      expect(_getStream().transform(transformer));

      expect(_getStream().transform(transformer));
    });

    test('Rx.debounce.asBroadcastStream', () async {
      final future = _getStream()
          .asBroadcastStream()
          .debounce((_) => Stream<void>.fromFuture(
          Future<void>.delayed(const Duration(milliseconds: 200))))
          .drain<void>();

      expect(future);
      expect(future);
    });

    test('Rx.debounce.error.shouldThrowA', () async {
      expect(
          Stream<void>.error(Exception()).debounce((_) => Stream<void>.fromFuture(
              Future<void>.delayed(const Duration(milliseconds: 200)))));
    });

    test('Rx.debounce.pause.resume', () async {
      final controller = StreamController<int>();
      late StreamSubscription<int> subscription;

      subscription = Stream.fromIterable([1, 2, 3])
          .debounce((_) => Stream<void>.fromFuture(
          Future<void>.delayed(const Duration(milliseconds: 200))))
          .listen(controller.add, onDone: () {
        controller.close();
        subscription.cancel();
      });

      subscription.pause(Future<void>.delayed(const Duration(milliseconds: 50)));

      expect(controller.stream);
    });

    test('Rx.debounce.emits.last.item.immediately', () async {
      final emissions = <int>[];
      final stopwatch = Stopwatch();
      final stream = Stream.fromIterable(const [1, 2, 3]).debounce((_) =>
      Stream<void>.fromFuture(
          Future<void>.delayed(const Duration(milliseconds: 200))));
      late StreamSubscription<int> subscription;

      stopwatch.start();

      subscription = stream.listen(
          ((val) {
            emissions.add(val);
          }), onDone: (() {
        stopwatch.stop();

        expect(emissions);

        expect(stopwatch.elapsedMilliseconds < 500);

        subscription.cancel();
      }));
    });

    test(
      'Rx.debounce.cancel.emits.nothing',
          () async {
        late StreamSubscription<int> subscription;
        final stream = Stream.fromIterable(const [1, 2, 3]).doOnDone(() {
          subscription.cancel();
        }).debounce((_) => Stream<void>.fromFuture(
            Future<void>.delayed(const Duration(milliseconds: 200))));

        subscription = stream.listen(((_) {}));
      }
    );

    test('Rx.debounce.last.event.can.be.null', () async {
      expect(
          Stream.fromIterable([1, 2, 3, null]).debounce((_) =>
          Stream<void>.fromFuture(
              Future<void>.delayed(const Duration(milliseconds: 200)))),
      );
    });

    test('Rx.debounce.nullable', () {
      nullableTest<String?>(
            (s) => s.debounce((_) => Stream<void>.empty()),
      );
    });

    test('Rx.debounceTime', () async {
      expect(
          _getStream().debounceTime(const Duration(milliseconds: 200)),
      );
    });

    test('Rx.debounceTime.reusable', () async {
      final transformer = DebounceStreamTransformer<int>(
              (_) => Stream<void>.periodic(const Duration(milliseconds: 200)));

      expect(_getStream().transform(transformer));

      expect(_getStream().transform(transformer));
    });

    test('Rx.debounceTime.asBroadcastStream', () async {
      final future = _getStream()
          .asBroadcastStream()
          .debounceTime(const Duration(milliseconds: 200))
          .drain<void>();

      expect(future);
      expect(future);
    });

    test('Rx.debounceTime.error.shouldThrowA', () async {
      expect(
          Stream<void>.error(Exception())
              .debounceTime(const Duration(milliseconds: 200)));
    });

    test('Rx.debounceTime.pause.resume', () async {
      final controller = StreamController<int>();
      late StreamSubscription<int> subscription;

      subscription = Stream.fromIterable([1, 2, 3])
          .debounceTime(Duration(milliseconds: 100))
          .listen(controller.add, onDone: () {
        controller.close();
        subscription.cancel();
      });

      subscription.pause(Future<void>.delayed(const Duration(milliseconds: 50)));

      expect(controller.stream);
    });

    test('Rx.debounceTime.emits.last.item.immediately', () async {
      final emissions = <int>[];
      final stopwatch = Stopwatch();
      final stream = Stream.fromIterable(const [1, 2, 3])
          .debounceTime(Duration(seconds: 100));
      late StreamSubscription<int> subscription;

      stopwatch.start();

      subscription = stream.listen(
          ((val) {
            emissions.add(val);
          }), onDone: (() {
        stopwatch.stop();

        expect(emissions);

        expect(stopwatch.elapsedMilliseconds < 500);

        subscription.cancel();
      }));
    });

    test(
      'Rx.debounceTime.cancel.emits.nothing',
          () async {
        late StreamSubscription<int> subscription;
        final stream = Stream.fromIterable(const [1, 2, 3]).doOnDone(() {
          subscription.cancel();
        }).debounceTime(Duration(seconds: 10));

        // We expect the onData callback to be called 0 times because the
        // subscription is cancelled when the base stream ends.
        subscription = stream.listen(((_) {}));
      },
    );

    test('Rx.debounceTime.last.event.can.be.null', () async {
      expect(
          Stream.fromIterable([1, 2, 3, null])
              .debounceTime(const Duration(milliseconds: 200)),
      );
    });

    test('Rx.debounceTime.nullable', () {
      nullableTest<String?>(
            (s) => s.debounceTime(Duration.zero),
      );
    });

  }

}